package com.ydl.learning.flink.demo

import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration

/**
 * 创建dataSet的几种方式
 *
 * @author ydl
 * @since 2020/10/10
 */
object DataSetDemo {
  /**
   * 从collection中创建
   */
  def fromCollection(env: ExecutionEnvironment): Unit = {

    val inclusive = 1 to 10
    val unit = env.fromCollection(inclusive)
    unit.print()
  }

  /**
   * 从文件中创建
   */
  def fromTextFile(env: ExecutionEnvironment): Unit = {

    //读文件
    //    val unit = env.readTextFile("F:\\flink.txt")
    //    val value = unit.setParallelism(1)
    //    value.print()

    //读文件夹  自动识别文件夹 但是不能识别文件嵌套
    val unit = env.readTextFile("F:\\input")
    val value = unit.setParallelism(1)
    value.print()

  }

  /**
   * 从csv文件中创建
   *
   * @param env
   */
  def fromCSVFile(env: ExecutionEnvironment): Unit = {


    //读csv文件   csv文件长度和类型必须一致和    (Int,Int,Int)  一致                                   F:\\input\\student.csv
    val unit = env.readCsvFile[(Int, Int, Int)]("file:///F:\\input\\student.csv", ignoreFirstLine = true)
    val value = unit.setParallelism(1)
    value.print()

    println("+++++++++++++++++++++++")
    //Tuple形式
    val unit1 = env.readCsvFile[(Int, Int)]("file:///F:\\input\\student.csv", ignoreFirstLine = true, includedFields = Array[Int](0, 2))
    unit1.print()
    //java  pojo形式
    println("+++++++++++++++++++++++")
    //val unit2 = env.readCsvFile[MyClass]("file:///F:\\input\\student.csv",ignoreFirstLine = true,pojoFields =  Array("a","b"))

    //unit2.print()
    println("+++++++++++++++++++++++")
    //case class 方式
    val unit3 = env.readCsvFile[Myclas1]("file:///F:\\input\\student.csv", ignoreFirstLine = true, includedFields = Array(0, 2))

    unit3.print()

  }

  case class Myclas1(a: Int, b: Int)


  /**
   * 递归读取文件夹下文件创建dataset
   *
   * @param env
   */
  def fromReFile(env: ExecutionEnvironment): Unit = {
    val unit1 = env.readTextFile("file:///F:\\input")
    unit1.print()

    println("_________________________________")

    val configuration = new Configuration()
    configuration.setBoolean("recursive.file.enumeration", true)
    val unit = env.readTextFile("file:///F:\\input").withParameters(configuration)
    unit.print()
  }


  /**
   * 从压缩文件中读取
   *
   * 读压缩文件
   * Flink目前支持输入文件的透明解压缩，如果它们标有适当的文件扩展名。
   * 特别是，这意味着不需要进一步配置输入格式，并且任何FileInputFormat支持压缩，
   * 包括自定义输入格式。请注意，压缩文件可能无法并行读取，从而影响作业可伸缩性。
   *
   * 下表列出了当前支持的压缩方法。
   *
   *
   *
   * 压缩方法	  文件扩展名				  可并行
   * DEFLATE	  .deflate	 	     	 没有
   * GZip压缩	  .gz， .gzip	         没有
   * bzip2的	  .bz2	                 没有
   * XZ	.xz	             		     没有
   *
   *
   *
   */
  def fromCompressionFile(env: ExecutionEnvironment): Unit = {
    val unit1 = env.readTextFile("file:///F:\\input")
    unit1.print()

    println("_________________________________")

    val configuration = new Configuration()
    configuration.setBoolean("recursive.file.enumeration", true)
    val unit = env.readTextFile("file:///F:\\input").withParameters(configuration)
    unit.print()
  }
}
